package org.example.launch
import org.example.constant.ApolloConst
import org.example.dao.MysqlConfig
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.search.builder.SearchSourceBuilder
import org.elasticsearch.spark.sql.EsSparkSQL
import org.example.utils.{CommonUtils, MysqlUtil}

import java.sql.ResultSet

/**
 * 报警明细推送到ES
 */
object AlarmDataIntegrityIndex {
    //按小时获取数据
    def getAlarmDataIntegrityIndexByHour(sparkSession: SparkSession): DataFrame = {
      //******************************hive逻辑************************************
      val hiveSql =
        s"""
           |SELECT
           |t2.industry_type,t1.business_owner_id,t1.business_owner_name,t2.vehicle_number,t2.vehicle_plate_color
           |FROM
           |(
           |select '1011' industry_type,vehicle_id,business_owner_id,vehicle_number,vehicle_plate_color from dwd.dwd_yz_vehicle_info where vecent_type = '客车' and operation_state = '营运'
           |union all
           |select '1010' industry_type,vehicle_id,business_owner_id,vehicle_number,vehicle_plate_color from dwd.dwd_yz_vehicle_info where vecent_type = '危货' and operation_state = '营运'
           |union all
           |select '1009' industry_type,vehicle_id,business_owner_id,vehicle_number,vehicle_plate_color from dwd.dwd_yz_vehicle_info where vecent_type = '普货' and operation_state = '营运'
           |) as t2 inner join
           |(
           |select business_owner_id,business_owner_name from dwd.dwd_yz_company_info where business_owner_status = '营业'
           |) as t1
           |on t1.business_owner_id=t2.business_owner_id""".stripMargin
      val hiveDataFrame = sparkSession.sql(hiveSql)
      //建立hive表
      hiveDataFrame.createOrReplaceTempView("yz_company_info")
      //*******************************ES逻辑****************************************
      var builder: SearchSourceBuilder = new SearchSourceBuilder() //用于查询的对象
      //val queryBuilder = QueryBuilders.boolQuery() //添加查询规则
      val queryBuilder = QueryBuilders.matchAllQuery() //不限制条件查询
      builder.query(queryBuilder) //查询规则加到查询对象中
      val esQuery = builder.toString //将查询对象转为string格式
      val warnInfo_indexFrame = EsSparkSQL.esDF(sparkSession, "warninfo_index/_doc", esQuery)
      warnInfo_indexFrame.createOrReplaceTempView("warninfo_index")
      val warn_attach_indexFrame = EsSparkSQL.esDF(sparkSession, "warn_attach_index/_doc", esQuery)
      warn_attach_indexFrame.createOrReplaceTempView("warn_attach_index")
      val esSql =
        s"""
           |select a.id,
           |a.datetime,
           |a.vehicleNo,
           |(case when a.vehicleColor='1' then '蓝色'
           |when a.vehicleColor='2' then '黄色'
           |when a.vehicleColor='3' then '黑色'
           |when a.vehicleColor='4' then '白色'
           |when a.vehicleColor='5' then '绿色'
           |when a.vehicleColor='9' then '其它'
           |when a.vehicleColor='96' then '渐变绿色'
           |when a.vehicleColor='97' then '黄绿双拼色'end) vehicleColor,
           |sum(case when b.filePath like '%.gif' then 1 else 0 end) imgCount,
           |sum(case when b.filePath like '%.mpv' then 1 else 0 end) videoCount
           |from
           |(
           |select * from warninfo_index where from_unixtime(datetime,'yyyy-MM-dd')=current_date()
           |) a
           |left join
           |(
           |select filePath,vehicleNo,vehicleColor,INFO_ID from warn_attach_index where from_unixtime(warnTime,'yyyy-MM-dd')=current_date()
           |) b
           |on a.vehicleNo=b.vehicleNo and a.vehicleColor=b.vehicleColor and a.infoId=b.INFO_ID
           |group by a.id,a.datetime,a.vehicleNo,a.vehicleColor
        """.stripMargin
      sparkSession.sql(esSql).createOrReplaceTempView("alarm_data")
      //*******************************Mysql逻辑**************************
      val mysqlConfig = MysqlConfig("jdbc", ApolloConst.jgdMysqlURL, "", ApolloConst.jgdMysqlUserName, ApolloConst.jgdMysqlPassWord, "")
      val mysql =
        s"""
         select id,enterprise_name,social_credit_code from zcov.base_into_enterprise_info
           |""".stripMargin
      val set_veh: ResultSet = MysqlUtil.getMysqlQueryResult(mysql)
      val DF_veh: DataFrame = MysqlUtil.resultSetToDataframe(set_veh, sparkSession).toDF("id", "enterprise_name", "social_credit_code")
      // 创建Mysql临时表
      DF_veh.createOrReplaceTempView("base_into_enterprise_info")
      val resultSql =
        s"""
           |select t1.id alarmId,t1.datetime as alarmTime,t1.business_owner_id as enterpriseCode,t2.id,t1.imgCount,t2.social_credit_code,t1.videoCount,t1.industry_type as typeCode
           |from
           |(
           |select b.id,b.datetime,a.business_owner_id,a.business_owner_name,a.industry_type,b.imgCount,b.videoCount
           |from yz_company_info a inner join alarm_data b
           |on a.vehicle_number=b.vehicleNo and a.vehicle_plate_color=b.vehicleColor
           |) t1
           |left join
           |base_into_enterprise_info t2 on t1.business_owner_name=t2.enterprise_name""".stripMargin
      val dataFrame = sparkSession.sql(resultSql)
      dataFrame.show(10,false)
      dataFrame
    }

  def main(args: Array[String]): Unit = {
    val sparkSession = CommonUtils.getSparkSession()
    EsSparkSQL.saveToEs(getAlarmDataIntegrityIndexByHour(sparkSession),"alarm_data_integrity_index/_doc")
  }
}
